package com.cs.test.config;

import com.cs.test.common.Constant;
import com.cs.test.service.MsgLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author james
 */
@Configuration
@Slf4j
public class RabbitConfig {

    /**
     * 发送邮件
     */
    public static final String MAIL_QUEUE_NAME = "mail.queue";
    public static final String MAIL_EXCHANGE_NAME = "mail.exchange";
    public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Autowired
    private MsgLogService msgLogService;

    /**
     * // TODO
     * 消息转换器,说白了就是json转换为对象???
     */
    @Bean
    public Jackson2JsonMessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 设置消息转换器
        rabbitTemplate.setMessageConverter(converter());
        // 消息是否成功发送到Exchange
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                String msgId = correlationData.getId();
                log.info("id为{}消息成功发送到Exchange,开始修改其对应的状态", msgId);
                /**
                 因为在controler中发送完成消息后,在log记录插入一条记录, 这里将
                 记录的状态修改为已经更新为已经投递
                 */
                msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_SUCCESS);
            } else {
                log.info("消息发送到Exchange失败,{},cause:{}", correlationData, cause);
            }
        });
        // 触发setReturnCallback回调必须设置为mandatory=true, 否则Exchange没有找到Queue就会丢弃掉,而不会触发回调
        rabbitTemplate.setMandatory(true);
        // 消息是否从Exchange路由到Queue, PS:这是一个失败回调,只有消息从Exchange路由到Queue失败才会回调这个方法
        rabbitTemplate.setReturnCallback((message, replyCode, replyText,
                                          exchange, routingKey) -> {
            log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}",
                    exchange, routingKey, replyCode, replyText, message);
        });
        return rabbitTemplate;
    }

    /**
     * declare  email交换机
     */
    public DirectExchange emailExchange() {
        return new DirectExchange(MAIL_EXCHANGE_NAME, true, false);
    }

    /**
     * 声明 email队列
     */
    @Bean
    public Queue mailQueue() {
        return new Queue(MAIL_QUEUE_NAME, true);
    }

    /**
     * email队列和mail交换机绑定
     */
    @Bean
    public Binding mailBinding() {
        return BindingBuilder.bind(mailQueue()).to(emailExchange()).with(MAIL_ROUTING_KEY_NAME);
    }
}
